简单版的 Token认证
# auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from api.models import *
class TokenAuth(BaseAuthentication):
def authenticate(self, request):
# token = request.data.get('token') # 从请求体中获取token参数
token = request.META.get('HTTP_AUTHORIZATION') # 从请求头中获取token参数
token_obj = UserToken.objects.filter(token=token).first()
if not token_obj:
raise AuthenticationFailed({'code': 0, 'error': '认证失败'})
# return '', None
return token_obj.user, token_obj
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from api.models import *
from api.utils.auth import TokenAuth
import uuid
import datetime
# 初始化返回值
class BaseResponse:
def __init__(self):
self.code = ''
self.status = False
self.data = None
self.msg = ''
@property
def dic(self):
return self.__dict__
# 登陆,写入token
class LoginView(APIView):
def post(self, request, *args, **kwargs):
res = BaseResponse()
try:
username = request.data.get('username')
password = request.data.get('password')
user = UserInfo.objects.filter(username=username, password=password).first()
if user:
uid = uuid.uuid4()
UserToken.objects.update_or_create(user=user, defaults={'token': uid, 'created': datetime.datetime.now()})
res.status = True
res.data = {
'token': uid
}
res.msg = '登陆成功'
except Exception as e:
res.code = 1000
res.msg = '登陆失败'
return Response(res.dic)
# 测试认证
class IndexView(APIView):
authentication_classes = [TokenAuth]
def get(self, request, *args, **kwargs):
res = BaseResponse()
res.status = True
res.data = {'username': request.user.username}
return Response(res.dic)
# models.py
from django.db import models
class UserInfo(models.Model):
"""
用户表
"""
username = models.CharField(verbose_name='用户名', max_length=32)
password = models.CharField(verbose_name='密码', max_length=32)
class Meta:
verbose_name = "用户表"
verbose_name_plural = verbose_name
def __str__(self):
return self.username
class UserToken(models.Model):
"""
用户token表
"""
user = models.OneToOneField(to='UserInfo')
token = models.CharField(verbose_name='token', max_length=64)
created = models.DateTimeField(verbose_name="创建时间", auto_now_add=True)
class Meta:
verbose_name = "用户Token表"
verbose_name_plural = verbose_name
复杂版的 Token认证
- 功能描述: 缓存token(即: 不用每次从数据库中获取token进行比较) + 超时token
# auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from api.models import *
from django.core.cache import cache
import datetime
import pytz
class TokenAuth(BaseAuthentication):
def authenticate(self, request):
# token = request.data.get('token') # 从请求体中获取token参数
token = request.META.get('HTTP_AUTHORIZATION') # 从请求头中获取token参数
# 从缓存中获取以token为key的缓存内容
user = cache.get(token)
if user:
return user, token
# 数据库验证:如果缓存中没有token,那么就使用数据库验证
token_obj = UserToken.objects.filter(token=token).first()
if not token_obj:
raise AuthenticationFailed({'code': 0, 'error': '认证失败'})
# return '', None
# 验证token是否在有效期内
now_time = datetime.datetime.now()
now_time = now_time.replace(tzinfo=pytz.timezone('UTC')) # 因为.now()默认获取是不带时区的时间,而从数据库中获取的是带时区的时间所以要给.now()添加上时区
create_time = token_obj.created
delta = now_time - create_time # 相差时间
if delta < datetime.timedelta(weeks=2): # 当前时间是否小于两周的时间
# 校验成功,写入数据库
delta = datetime.timedelta(weeks=2) - delta # token 剩余时间
cache.set(token_obj.token, token_obj.user, min(delta.total_seconds(), 60 * 60 * 24 * 7)) # delta.total_seconds():获取实际的秒数,如果剩余时间超过7天,那么就使用7时间,如果没有则使用剩余的时间,这里的时间不能是写死的
else:
raise AuthenticationFailed({'code': 0, 'error': 'token超时'})
return token_obj.user, token_obj
# views.py
# 这里的代码就是简单版的token认证中的 views.py 代码
# models.py
# 这里的代码就是简单版的token认证中的 models.py 代码
JsonWebToken(简称: JWT) Token认证
注意: JsonWebToken 是基于 Django 所提供的 User 表进行操作的
1. 下载模块
pip3 install djangorestframework-jwt -i https://pypi.douban.com/simple # 使用豆瓣的镜像
2. 全局配置
# settings.py
JWT_AUTH = {
'JWT_EXPIRATION_DELTA': datetime.timedelta(days=7), # 过期时间,默认过期时间300秒
'JWT_AUTH_HEADER_PREFIX': 'JWT', # token前缀,默认值 JWT
}
3. jwt所提供的url -> 自动获取、刷新、验证 token
from rest_framework_jwt.views import (
obtain_jwt_token,
verify_jwt_token,
refresh_jwt_token
)
urlpatterns = [
url(r'^login/', obtain_jwt_token), # 登录验证 + 获取token
url(r'^api-token-refresh/', refresh_jwt_token), # 刷新token
url(r'^api-token-verify/', verify_jwt_token), # 验证token
]
4. 手动获取、验证 token
- 其实 obtain_jwt_token 视图类都是调用 JSONWebTokenSerializer 下的 validate 方法进行验证
- 其实 verify_jwt_token 视图类都是调用 VerifyJSONWebTokenSerializer 下的 validate 方法进行验证
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.authentication import BaseAuthentication
from rest_framework_jwt.serializers import (
JSONWebTokenSerializer,
VerifyJSONWebTokenSerializer
)
class LoginView(APIView):
def post(self, request, *args, **kwargs):
req_data = {
'username': request.data.get('username'),
'password': request.data.get('password')
}
# 手动获取 token
jwt = JSONWebTokenSerializer()
user_info = jwt.validate(req_data)
return Response({'token': user_info['token']})
class TokenAuth(BaseAuthentication):
def authenticate(self, request):
http_token = request.META.get('HTTP_TOKEN')
token = {"token": http_token}
# 手动验证 token
vjt = VerifyJSONWebTokenSerializer()
valid_data = vjt.validate(token)
return valid_data['user'], valid_data['token']
class IndexView(APIView):
authentication_classes = [TokenAuth]
def get(self, request, *args, **kwargs):
return Response({
'msg': 'index_info',
'user': request.user.username,
'token': request.auth
})
5. 重写 validate 方法
- 如果直接使用 obtain_jwt_token 视图类验证登陆,那么在登陆失败的时候会返回状态码为400的错误,如果想返回状态码为200的自定义登陆失败信息,那么就要重写 obtain_jwt_token -> JSONWebTokenSerializer -> validate 方法
# login_jwt.py
from rest_framework_jwt.serializers import JSONWebTokenSerializer
from rest_framework import serializers
from django.contrib.auth import authenticate
from django.utils.translation import ugettext as _
from rest_framework_jwt.settings import api_settings
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
class LoginJWT(JSONWebTokenSerializer):
def validate(self, attrs):
credentials = {
self.username_field: attrs.get(self.username_field),
'password': attrs.get('password')
}
if all(credentials.values()):
user = authenticate(**credentials)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
payload = jwt_payload_handler(user)
return {
'token': jwt_encode_handler(payload),
'user': user
}
else:
# 原本的写法
# msg = _('Unable to log in with provided credentials.')
# raise serializers.ValidationError(msg)
raise Exception('登陆失败') # 不使用 serializers.ValidationError 的异常错误
else:
msg = _('Must include "{username_field}" and "password".')
msg = msg.format(username_field=self.username_field)
raise serializers.ValidationError(msg)
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from api.utils.login_jwt import LoginJWT
class LoginView(APIView):
def post(self, request, *args, **kwargs):
try:
req_data = {
'username': request.data.get('username'),
'password': request.data.get('password')
}
# 手动获取 token
jwt = LoginJWT()
user_info = jwt.validate(req_data)
return Response({'token': user_info['token']})
except Exception as e:
return Response({'msg': str(e)})
6. 使用 JsonWebToken 进行 token 认证的例子
# settings.py
JWT_AUTH = {
'JWT_EXPIRATION_DELTA': datetime.timedelta(days=7), # 过期时间,默认过期时间300秒
'JWT_AUTH_HEADER_PREFIX': 'JWT', # token前缀,默认值 JWT
}
# usrls.py
from rest_framework_jwt.views import obtain_jwt_token
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^login/$', obtain_jwt_token),
url(r'^index/$', IndexView.as_view()),
]
# auth.py
from rest_framework.authentication import BaseAuthentication
from rest_framework_jwt.serializers import VerifyJSONWebTokenSerializer
class TokenAuth(BaseAuthentication):
def authenticate(self, request):
http_token = request.META.get('HTTP_TOKEN')
token = {"token": http_token}
# 手动验证 token
vjt = VerifyJSONWebTokenSerializer()
valid_data = vjt.validate(token)
return valid_data['user'], valid_data['token']
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from api.utils.auth import TokenAuth
class IndexView(APIView):
authentication_classes = [TokenAuth]
def get(self, request, *args, **kwargs):
return Response({
'username': request.user.username,
'token': request.auth,
'msg': 'index_info'
})